package com.czxy

import java.util.Properties

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/*
用于政治面貌标签的开发 1群众 2党员 3无党派人士
 */
object PoliticalFaceTag {


  def main(args: Array[String]): Unit = {

    //1  创建SparkSQL对象
    //   用于读取mysql ， hbase等数据
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("GenderTag").getOrCreate()

    //2  连接mysql 数据库
    var url = "jdbc:mysql://bd001:3306/tags_new?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&user=root&password=123456"
    var table = "tbl_basic_tag"
    var properties: Properties = new Properties
    val mysqlConn: DataFrame = spark.read.jdbc(url, table, properties)

    //导入饮隐式转换
    import spark.implicits._
    //引入java和scala转化
    import scala.collection.JavaConverters._
    //sparksql内置函数
    import org.apache.spark.sql.functions._

    //3  读取mysql数据库中方的四级标签
    //   为读取hbase数据做准备
    val fourTags: Dataset[Row] = mysqlConn.select("id", "rule").where("id=80")

    val fourMap: Map[String, String] = fourTags.map(row => {
      row.getAs("rule").toString.split("##").map(
        kv => {
          val arr: Array[String] = kv.split("=")
          (arr(0), arr(1))

        }
      )
    }).collectAsList().get(0).toMap

    //将fourMap转化成样例类

    var hBaseMeta: HBaseMeta = getHBaseMeta(fourMap)

    println(hBaseMeta.selectFields)

    //4读取五级标签
    val fiverTag: Dataset[Row] = mysqlConn.select("id", "rule").where("pid=80")

    val fiverList: List[TagRule] = fiverTag.map(row => {
      val id: Int = row.getAs("id").toString.toInt
      val rule: String = row.getAs("rule").toString
      TagRule(id, rule)
    }).collectAsList().asScala.toList


    /*
      for (a<- fiverList){
       println(a.id+" "+a.rule)
      }
     */


    //5读取hbase数据
    val hbaseDatas: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource")
      .option("zkHosts", hBaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, hBaseMeta.hbaseTable)
      .option(HBaseMeta.FAMILY, hBaseMeta.family)
      .option(HBaseMeta.SELECTFIELDS, hBaseMeta.selectFields)
      .load()


    val GetJOB: UserDefinedFunction = udf((rule: String) => {
      var PFId = 0
      for (fiverList <- fiverList) {
        if (fiverList.rule == rule) {
          PFId = fiverList.id
        }
      }
      PFId
    })
    //6使用五级标签匹配hbase
    //自定义函数
    var PFMatching = GetPF
    val PFTags: DataFrame = hbaseDatas.select('id as ("userId"), GetPF('PF) as ("tagsId"))
    PFTags.show()


    var getAllTags = udf((oldId:String,newId:String)=>{
      if (oldId==""){
        newId
      }else if (newId==""){
        oldId
      }else if(oldId==""&&newId==""){
        ""
      }else{
        val Tags =  oldId+","+newId
        Tags.split(",").distinct.mkString(",")
      }
    })
    //7读取habse追加写入
    //读取hbase历史记录
    val oldData: DataFrame = spark.read.format("com.czxy.tools.HBaseDataSource")
      .option("zkHosts", hBaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.FAMILY, "detail")
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .load()

    oldData.show()
    //老数据合并新数据join

    val AllTags: DataFrame = oldData.join(PFTags, oldData("userId") === PFTags("userId"))
    // val frame: DataFrame = AllTags.select("userId", "tagsId")
    val realAllTags: DataFrame = AllTags.select(
      //处理第一个字段取一个
      when((oldData.col("userId").isNotNull), (oldData.col("userId")))
        .when((PFTags.col("userId").isNotNull), (PFTags.col("userId")))
        .as("userId"),
      //处理第二个字段
      getAllTags(oldData.col("tagsId"), PFTags.col("tagsId")).as("tagsId")
    )
    realAllTags.show()
    //  8  将最终的标签写入hbase
    realAllTags.write.format("com.czxy.tools.HBaseDataSource")
      .option("zkHosts", hBaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMeta.zkPort)
      .option(HBaseMeta.HBASETABLE, "test")
      .option(HBaseMeta.FAMILY, "detail")
      .option(HBaseMeta.SELECTFIELDS, "userId,tagsId")
      .save()

  }

  def getHBaseMeta(fourMap: Map[String, String]): HBaseMeta = {
    HBaseMeta(fourMap.getOrElse(HBaseMeta.INTYPE, ""),
      fourMap.getOrElse(HBaseMeta.ZKHOSTS, ""),
      fourMap.getOrElse(HBaseMeta.ZKPORT, ""),
      fourMap.getOrElse(HBaseMeta.HBASETABLE, ""),
      fourMap.getOrElse(HBaseMeta.FAMILY, ""),
      fourMap.getOrElse(HBaseMeta.SELECTFIELDS, ""),
      fourMap.getOrElse(HBaseMeta.ROWKEY, "")

    )

  }
}
